#include "config.h"

#include <sys/types.h>
#include <sys/epoll.h>
#include <sys/file.h>
#include <sys/mman.h>
#include <unistd.h>
#include <string.h>
#include <stdio.h>
#include <stdint.h>
#include <sys/types.h>
#include <libgen.h>
#include <ctype.h>
#include <fcntl.h>
#include <libaio.h>
#include <errno.h>
#include <getopt.h>

#define DBG_SUBSYS S_LIBREPLICA

#include "core.h"
#include "cache.h"
#include "disk.h"
#include "sysy_lib.h"
#include "ynet_rpc.h"
#include "job_dock.h"
#include "net_global.h"
#include "diskmd.h"
#include "bh.h"
#include "tpool.h"
#include "lich_md.h"
#include "configure.h"
#include "dbg.h"
#include "replica.h"
#include "core.h"
#include "disk_maping.h"
#include "coroutine.h"
#include "fnotify.h"
#include "redis_utils.h"
#include "disk_redis.h"
#include "locator_rpc.h"

static const char *__raw__ = "raw";
static const char *__metadata__ = "metadata";
static redis_conn_t **__conn__;

static redis_conn_t *db(uint64_t hash)
{
        return __conn__[hash % REDIS_GROUP];
}
        

static int __disk_redis_diskop(const chkid_t *chkids, const diskloc_t *locs, int count, const char *op);

static void __disk_redis_encode(char *buf, const char *pool, const diskloc_t *loc, const chkid_t *parent, uint64_t meta_version)
{
        snprintf(buf, MAX_BUF_LEN, "mdvers:%u,pool:%s,diskid:%u,offset:%u,parent:"CHKID_FORMAT",meta:%ju",
                 0, pool, loc->diskid, loc->idx, CHKID_ARG(parent), meta_version);
}

void disk_redis_decode(const char *buf, char *_pool, diskloc_t *_loc, chkid_t *_parent, uint64_t *_meta_version)
{
        int ret;
        uint32_t mdvers;
        diskloc_t loc;
        char parent[MAX_NAME_LEN];
        uint64_t meta_version;
        char pool[MAX_NAME_LEN];

        ret = sscanf(buf, "mdvers:%u,pool:%[^,],diskid:%u,offset:%u,parent:%[^,],meta:%ju",
                     &mdvers, pool, &loc.diskid, &loc.idx, parent, &meta_version);
        YASSERT(ret == 6);

        if (_pool)
                strcpy(_pool, pool);

        if (_parent)
                str2chkid(_parent, parent);

        if (_meta_version)
                *_meta_version = meta_version;

        if (_loc)
                *_loc = loc;
}

void disk_redis_key(const char *buf, chkid_t *chkid)
{
        str2chkid(chkid, buf);
}

static void __disk_redis_diskset(int diskid, char *set)
{
        snprintf(set, MAX_NAME_LEN, "disk:%u", diskid);
}

#define REDIS_HASH 100000

static void __disk_redis_key_hash(const chkid_t *chkid, char *hash)
{
        volid_t volid;
        //if (chkid_isvol(chkid)) {
        if (chkid->type == __RAW_CHUNK__) {
                cid2fid(&volid, chkid);

                snprintf(hash, MAX_PATH_LEN, "%s:%s", __raw__, id2str(&volid));
        } else {
                strcpy(hash, __metadata__);
        }
}

static int __disk_redis_get(const chkid_t *chkid, diskloc_t *loc, chkid_t *parent, uint64_t *meta_version, char *pool)
{
        int ret, buflen;
        char key[MAX_BUF_LEN],  hash[MAX_BUF_LEN], buf[MAX_BUF_LEN];

        ANALYSIS_BEGIN(0);
        
        __disk_redis_key_hash(chkid, hash);
        strcpy(key, id2str(chkid));
        buflen = MAX_BUF_LEN;
        ret = redis_hget(db(chkid->id), hash, key, buf, &buflen);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        disk_redis_decode(buf, pool, loc, parent, meta_version);

        ANALYSIS_QUEUE(0, IO_WARN, "__disk_redis_get");
        
        return 0;
err_ret:
        return ret;
}

static int __disk_redis_set(const chkid_t *chkid, const diskloc_t *loc,
                            const chkid_t *parent, uint64_t meta_version, const char *pool)
{
        int ret;
        char key[MAX_NAME_LEN],hash[MAX_NAME_LEN], buf[MAX_BUF_LEN];
        char disk[MAX_NAME_LEN];

        ANALYSIS_BEGIN(0);

        strcpy(key, id2str(chkid));
        __disk_redis_diskset(loc->diskid, disk);
        ret = redis_sset(db(chkid->id), disk, key);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        __disk_redis_key_hash(chkid, hash);
        __disk_redis_encode(buf, pool, loc, parent, meta_version);
        ret = redis_hset(db(chkid->id), hash, key, buf, strlen(buf) + 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_QUEUE(0, IO_WARN, "__disk_redis_set");
        
        return 0;
err_ret:
        return ret;
}

static int __disk_redis_setparent(va_list ap)
{
        int ret;
        const chkid_t *chkid = va_arg(ap, const chkid_t *);
        const chkid_t *_parent = va_arg(ap, const chkid_t *);
        diskloc_t loc;
        chkid_t parent;
        uint64_t meta_version;
        char pool[MAX_NAME_LEN];

        va_end(ap);

        ret = __disk_redis_get(chkid, &loc, &parent, &meta_version, pool);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (chkid_cmp(&parent, _parent)) {
                ret = __disk_redis_set(chkid, &loc, _parent, meta_version, pool);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }
        
        return 0;
err_ret:
        return ret;
}

int disk_redis_setparent(const chkid_t *chkid, const chkid_t *parent)
{
        return schedule_newthread(SCHE_THREAD_REDIS, chkid->idx, FALSE, __FUNCTION__, -1, __disk_redis_setparent,
                        chkid, parent);
}

static int __disk_redis_setloc(va_list ap)
{
        int ret;
        const chkid_t *chkid = va_arg(ap, const chkid_t *);
        const diskloc_t *_loc = va_arg(ap, const diskloc_t *);
        diskloc_t loc;
        chkid_t parent;
        uint64_t meta_version;
        char pool[MAX_NAME_LEN];

        va_end(ap);

        ret = __disk_redis_get(chkid, &loc, &parent, &meta_version, pool);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (_loc->diskid != loc.diskid || _loc->idx != loc.idx) {
                ret = __disk_redis_set(chkid, _loc, &parent, meta_version, pool);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int disk_redis_setloc(const chkid_t *chkid, const diskloc_t *loc)
{
        return schedule_newthread(SCHE_THREAD_REDIS, chkid->idx, FALSE, __FUNCTION__, -1, __disk_redis_setloc,
                        chkid, loc);
}

static int __disk_redis_getmetaversion(va_list ap)
{
        int ret;
        const chkid_t *chkid = va_arg(ap, const chkid_t *);
        uint64_t *_meta_version = va_arg(ap, uint64_t *);
        diskloc_t loc;
        chkid_t parent;
        uint64_t meta_version;
        char pool[MAX_NAME_LEN];

        va_end(ap);

        ret = __disk_redis_get(chkid, &loc, &parent, &meta_version, pool);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        *_meta_version = meta_version;

        return 0;
err_ret:
        return ret;
}

int disk_redis_getmetaversion(const chkid_t *chkid, uint64_t *meta_version)
{
        return schedule_newthread(SCHE_THREAD_REDIS, chkid->idx, FALSE, __FUNCTION__, -1, __disk_redis_getmetaversion,
                        chkid, meta_version);
}

static int __disk_redis_setmetaversion(va_list ap)
{
        int ret;
        const chkid_t *chkid = va_arg(ap, const chkid_t *);
        uint64_t _meta_version = va_arg(ap, uint64_t);
        diskloc_t loc;
        chkid_t parent;
        uint64_t meta_version;
        char pool[MAX_NAME_LEN];

        va_end(ap);

        ret = __disk_redis_get(chkid, &loc, &parent, &meta_version, pool);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (meta_version != _meta_version) {
                ret = __disk_redis_set(chkid, &loc, &parent, _meta_version, pool);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int disk_redis_setmetaversion(const chkid_t *chkid, uint64_t meta_version)
{
        return schedule_newthread(SCHE_THREAD_REDIS, chkid->idx, FALSE, __FUNCTION__, -1, __disk_redis_setmetaversion,
                        chkid, meta_version);
}

static int __disk_redis_load(va_list ap)
{
        const chkid_t *chkid = va_arg(ap, const chkid_t *);
        diskloc_t *loc = va_arg(ap, diskloc_t *);
        chkid_t *parent = va_arg(ap, chkid_t *);
        char *pool = va_arg(ap, char *);
        uint64_t meta_version;

        va_end(ap);

        return __disk_redis_get(chkid, loc, parent, &meta_version, pool);
}

static int __disk_redis_load_request(va_list ap)
{
        int ret;
        const chkid_t *chkid = va_arg(ap, chkid_t *);
        diskloc_t *loc = va_arg(ap, diskloc_t *);
        chkid_t *parent = va_arg(ap, chkid_t *);
        char *pool = va_arg(ap, char *);

        va_end(ap);

        ret = schedule_newthread(SCHE_THREAD_REDIS, chkid->idx, FALSE,
                                 __FUNCTION__, -1, __disk_redis_load, chkid, loc, parent, pool);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int disk_redis_load(const chkid_t *chkid, diskloc_t *loc, chkid_t *parent, char *pool)
{
        int ret;

        ret = core_request(core_hash(chkid), -1, "disk sqlite3 load", __disk_redis_load_request,
                           chkid, loc, parent, pool);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

#if 0
static int __disk_redis_exist(va_list ap)
{
        int ret;
        const chkid_t *chkids = va_arg(ap, const chkid_t *);
        int chknum = va_arg(ap, int);
        int *exists = va_arg(ap, int *);
        const chkid_t *chkid;
        diskloc_t loc;
        chkid_t parent;
        uint64_t meta_version;
        char pool[MAX_NAME_LEN];

        va_end(ap);

        memset(exists, 0x0, sizeof(int) * chknum);

        for (int i = 0; i < chknum; i++) {
                chkid = &chkids[i];

                ret = __disk_redis_get(chkid, &loc, &parent, &meta_version, pool);
                if (unlikely(ret)) {
                        if (ret == ENOENT)
                                continue;
                        else
                                GOTO(err_ret, ret);
                }

                exists[i] = 1;
        }

        return 0;
err_ret:
        return ret;
}

#else

typedef struct {
        int *exists;
        int idx;
        char hash[MAX_NAME_LEN];
} arg_t;

static void __disk_redis_exist__(void *arg1, void *arg2)
{
        const char *key = arg1;
        arg_t *arg = arg2;

        if (key == NULL) {
                arg->exists[arg->idx] = 0;
        } else {
                arg->exists[arg->idx] = 1;
        }

        DBUG("%s[%u] %s\n", arg->hash, arg->idx, key == NULL ? "not exist" : "exist")
        
        arg->idx++;
}

static int __disk_redis_exist(va_list ap)
{
        int ret;
        const chkid_t *chkids = va_arg(ap, const chkid_t *);
        int chknum = va_arg(ap, int);
        int *exists = va_arg(ap, int *);
        const chkid_t *chkid;
        mctx_t mctx;
        
        va_end(ap);

        arg_t arg;
        char key[MAX_NAME_LEN], hash[MAX_NAME_LEN], tmp[MAX_NAME_LEN];
        
        memset(exists, 0x0, sizeof(int) * chknum);

        ANALYSIS_BEGIN(0);

        redis_multi_init(&mctx);
        
        __disk_redis_key_hash(&chkids[0], hash);
        for (int i = 0; i < chknum; i++) {
                chkid = &chkids[i];

                __disk_redis_key_hash(chkid, tmp);
                YASSERT(strcmp(tmp, hash) == 0);
                strcpy(key, id2str(chkid));

                redis_multi_append(&mctx, key, NULL, 0);
        }

        arg.exists = exists;
        arg.idx = 0;
        strcpy(arg.hash, hash);
        ANALYSIS_BEGIN(3);

        ret = redis_multi_exec(db(chkid->id), "HMGET", hash, &mctx,
                               __disk_redis_exist__, &arg);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        redis_multi_destory(&mctx);
        
        ANALYSIS_QUEUE(3, IO_WARN, "__disk_redis_exist_exec");
        YASSERT(arg.idx == chknum);

        ANALYSIS_QUEUE(0, IO_WARN, "__disk_redis_exist");
        
        return 0;
err_ret:
        redis_multi_destory(&mctx);
        return ret;
}

#endif

static int __disk_redis_exist_task(va_list ap)
{
        int ret;
        const chkid_t *chkids = va_arg(ap, chkid_t *);
        int chknum = va_arg(ap, int);
        int *exists = va_arg(ap, int *);

        va_end(ap);

        ret = schedule_newthread(SCHE_THREAD_REDIS, chkids[0].idx, FALSE, __FUNCTION__, -1, __disk_redis_exist,
                                 chkids, chknum, exists);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int disk_redis_exist(const chkid_t *chkids, int chknum, int *exists)
{
        int ret;

        ret = core_request(core_hash(chkids), -1, "disk_redis_exist_task",
                           __disk_redis_exist_task,
                           chkids, chknum, exists);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static int __disk_redis_del____(const chkid_t *chkids, int count)
{
        int ret;
        mctx_t mctx;
        const chkid_t *chkid;
        char key[MAX_NAME_LEN], hash[MAX_NAME_LEN], tmp[MAX_NAME_LEN];
        
        redis_multi_init(&mctx);

        __disk_redis_key_hash(&chkids[0], hash);
        
        for (int i = 0; i < count; i++) {
                chkid = &chkids[i];

                strcpy(key, id2str(chkid));
                __disk_redis_key_hash(chkid, tmp);
                YASSERT(strcmp(tmp, hash) == 0);
                
                redis_multi_append(&mctx, key, NULL, 0);
        }

        ret = redis_multi_exec(db(chkid->id), "HDEL", hash, &mctx, NULL, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        redis_multi_destory(&mctx);
        
        return 0;
err_ret:
        redis_multi_destory(&mctx);
        return ret;
}


static int __disk_redis_del__(va_list ap)
{
        int ret;
        const chkid_t *chkids = va_arg(ap, const chkid_t *);
        const diskloc_t *locs = va_arg(ap, const diskloc_t *);
        uint32_t chknum = va_arg(ap, uint32_t);

        va_end(ap);

        ANALYSIS_BEGIN(0);

        ret = __disk_redis_del____(chkids, chknum);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        ANALYSIS_QUEUE(0, IO_WARN, "__disk_redis_del__:0");
        
        
        ANALYSIS_BEGIN(1);

        ret = __disk_redis_diskop(chkids, locs, chknum, "SREM");
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        ANALYSIS_QUEUE(1, IO_WARN, "__disk_redis_del__:1");
        
        return 0;
err_ret:
        return ret;
}

static int __disk_redis_del(const chkid_t *chkids, const diskloc_t *locs, uint32_t count)
{
        return schedule_newthread(SCHE_THREAD_REDIS, chkids[0].idx, FALSE,
                                  __FUNCTION__, -1, __disk_redis_del__,
                                  chkids, locs, count);
}

static int __disk_redis_del_request(va_list ap)
{
        int ret;
        const chkid_t *chkid = va_arg(ap, chkid_t *);
        const diskloc_t *diskloc = va_arg(ap, const diskloc_t *);
        uint32_t count = va_arg(ap, uint32_t);
        va_end(ap);

        ret = __disk_redis_del(chkid, diskloc, count);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int disk_redis_del(const chkid_t *chkid, const diskloc_t *loc, uint32_t count)
{
        int ret;

        ret = core_request(core_hash(chkid), -1, "disk sqlite3 del",
                           __disk_redis_del_request, chkid, loc, count);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

typedef struct {
        struct list_head hook;
        const chkid_t *chkid;
        const diskloc_t *loc;
} disk_sort_t; 

typedef struct {
        struct list_head hook;
        struct list_head list;
        int diskid;
        int count;
} disk_sort_list_t;

static void __disk_redis_diskop_sort(struct list_head *list, const chkid_t *chkid,
                                  const diskloc_t *loc)
{
        int ret;
        struct list_head *pos;
        disk_sort_list_t *sort_list;
        disk_sort_t *sort;

        ret = ymalloc((void **)&sort, sizeof(*sort));
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        sort->loc = loc;
        sort->chkid = chkid;
        
        list_for_each(pos, list) {
                sort_list = (void *)pos;
                if (sort_list->diskid == loc->diskid) {
                        list_add_tail(&sort->hook, &sort_list->list);
                        sort_list->count++;
                        DBUG("add "CHKID_FORMAT" to disk[%u], count %u\n",
                              CHKID_ARG(chkid), loc->diskid, sort_list->count);
                        return;
                }
        }

        ret = ymalloc((void **)&sort_list, sizeof(*sort_list));
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        sort_list->diskid = loc->diskid;
        INIT_LIST_HEAD(&sort_list->list);
        list_add_tail(&sort_list->hook, list);
        list_add_tail(&sort->hook, &sort_list->list);
        sort_list->count = 1;
        DBUG("new disk[%u]\n", loc->diskid);
        DBUG("add "CHKID_FORMAT" to disk[%u], count %u\n",
              CHKID_ARG(chkid), loc->diskid, sort_list->count);
}

static int __disk_redis_diskop_commit(uint64_t hash, disk_sort_list_t *sort_list, const char *op)
{
        int ret, count = 0;
        disk_sort_t *sort;
        char key[MAX_NAME_LEN], disk[MAX_NAME_LEN];
        mctx_t mctx;
        struct list_head *pos, *n;

        redis_multi_init(&mctx);

        __disk_redis_diskset(sort_list->diskid, disk);

        ANALYSIS_BEGIN(0);
        
        list_for_each_safe(pos, n, &sort_list->list) {
                sort = (void *)pos;
                list_del(pos);

                strcpy(key, id2str(sort->chkid));
                redis_multi_append(&mctx, key, NULL, 0);
                count++;

                DBUG("queue %s "CHKID_FORMAT" to disk[%u], count %u\n",
                      op, CHKID_ARG(sort->chkid), sort_list->diskid, count);

                yfree((void **)&pos);
        }

        ANALYSIS_QUEUE(0, IO_WARN, "__disk_redis_diskop_commit:0");
        ANALYSIS_BEGIN(1);
        
        ret = redis_multi_exec(db(hash), op, disk, &mctx, NULL, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_QUEUE(1, IO_WARN, "__disk_redis_diskop_commit:1");

        redis_multi_destory(&mctx);

        return 0;
err_ret:
        redis_multi_destory(&mctx);
        return ret;
}


static int __disk_redis_diskop(const chkid_t *chkids, const diskloc_t *locs,
                               int count, const char *op)
{
        int ret;
        struct list_head list, *pos, *n;
        uint64_t hash = -1;

        INIT_LIST_HEAD(&list);
        
        for (int i = 0; i < count; i++) {
                if (hash == -1) {
                        hash = chkids[0].id;
                }

                YASSERT(hash == chkids[i].id);
                
                __disk_redis_diskop_sort(&list, &chkids[i], &locs[i]);
        }

        list_for_each_safe(pos, n, &list) {
                list_del(pos);

                ret = __disk_redis_diskop_commit(hash, (void *)pos, op);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                yfree((void **)&pos);
        }

        YASSERT(list_empty(&list));
        
        return 0;
err_ret:
        return ret;
}

static int __disk_redis_create__(const chkid_t *chkids, const diskloc_t *locs, int count,
                                 const chkid_t *parent, uint64_t meta_version, const char *pool)
{
        int ret;
        mctx_t mctx;
        const chkid_t *chkid;
        const diskloc_t *loc;
        char key[MAX_NAME_LEN], value[MAX_BUF_LEN], hash[MAX_NAME_LEN], tmp[MAX_NAME_LEN];
        
        redis_multi_init(&mctx);

        __disk_redis_key_hash(&chkids[0], hash);
        
        for (int i = 0; i < count; i++) {
                chkid = &chkids[i];
                loc = &locs[i];

                strcpy(key, id2str(chkid));
                __disk_redis_key_hash(chkid, tmp);
                YASSERT(strcmp(tmp, hash) == 0);
        
                __disk_redis_encode(value, pool, loc, parent, meta_version);
                
                redis_multi_append(&mctx, key, value, strlen(value) + 1);
        }

        ret = redis_multi_exec(db(chkid->id), "HMSET", hash, &mctx, NULL, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        redis_multi_destory(&mctx);
        
        return 0;
err_ret:
        redis_multi_destory(&mctx);
        return ret;
}

static int __disk_redis_create(va_list ap)
{
        int ret;
        const char *pool = va_arg(ap, const char *);
        const chkid_t *chkids = va_arg(ap, const chkid_t *);
        const diskloc_t *locs = va_arg(ap, const diskloc_t *);
        int chknum = va_arg(ap, int);
        const chkid_t *parent = va_arg(ap, const chkid_t *);
        uint64_t meta_version = va_arg(ap, uint64_t);

        va_end(ap);

        ANALYSIS_BEGIN(0);

        ret = __disk_redis_diskop(chkids, locs, chknum, "SADD");
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_QUEUE(0, IO_WARN, "__disk_redis_create:0");

        ANALYSIS_BEGIN(1);

        ret = __disk_redis_create__(chkids, locs, chknum, parent, meta_version, pool);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_QUEUE(1, IO_WARN, "__disk_redis_create:1");

        return 0;
err_ret:
        return ret;
}

int disk_redis_create(const char *pool, const chkid_t *chkid, const diskloc_t *loc, int chknum,
                        const chkid_t *parent,
                        uint64_t meta_version, int flag)
{
        (void) flag;
        
        return schedule_newthread(SCHE_THREAD_REDIS, chkid->idx, FALSE, __FUNCTION__, -1, __disk_redis_create,
                                  pool, chkid, loc, chknum, parent, meta_version);
}

int disk_redis_init(const char *home)
{
        int ret;
        char sock[MAX_PATH_LEN];
        redis_conn_t **conn;

        ret = ymalloc((void **)&conn, sizeof(*conn) * REDIS_GROUP);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        
        for (int i = 0; i < REDIS_GROUP; i++ ) {
                snprintf(sock, MAX_PATH_LEN, "%s/data/redis/%u/redis.sock", gloconf.home, i);

                ret = redis_connect(&conn[i], NULL, sock);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        __conn__ =  conn;
        
        return 0;
err_ret:
        return ret;
}

int disk_redis_count(int diskid, uint64_t *chunk_count)
{
        int ret;
        char disk[MAX_NAME_LEN];

        __disk_redis_diskset(diskid, disk);
        ret = redis_scount(db(diskid), disk, chunk_count);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

void disk_redis_close()
{
        UNIMPLEMENTED(__WARN__);
}

typedef struct {
        func2_t func;
        dm_itor_t func1;
        int flag;
        volid_t volid;
        int db;
        const char *type;
        void *arg;
} args_t;

static void __disk_redis_iterator_byvol(void *key, void *value, void *_arg)
{
        args_t *args = _arg;
        chkid_t chkid;

        disk_redis_key(key, &chkid);

        if (args->volid.id == chkid.id) {
                args->func(key, value, args->arg);
        }
}

void disk_redis_iterator_byvol(const volid_t *volid, func2_t func, void *arg)
{
        int ret;
        char hash[MAX_NAME_LEN];

        strcpy(hash, id2str(volid));

        ret = redis_hiterator(db(volid->id), hash, func, arg);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        args_t args;
        args.func = func;
        args.arg = arg;
        args.volid = *volid;
        ret = redis_hiterator(db(volid->id), __metadata__, __disk_redis_iterator_byvol, &args);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);
}

static void __disk_redis_iterator_table(void *_hash, void *_arg)
{
        int ret;
        args_t *args = _arg;
        const char *hash = _hash;

        if (strncmp(hash, __raw__, strlen(__raw__)) == 0) {
                DINFO("scan %s\n", hash);

                ret = redis_hiterator(db(args->db), hash, args->func, args->arg);
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);
        } else {
                DINFO("skip %s\n", hash);
        }
}


static void __disk_redis_iterator(const char *table, func2_t func, void *_arg)
{
        int ret;
        args_t args;

        if (strcmp(table, __metadata__) == 0) {
                for (int i = 0; i < REDIS_GROUP; i++ ) {
                        ret = redis_hiterator(__conn__[i], __metadata__, func, _arg);
                        if (unlikely(ret))
                                UNIMPLEMENTED(__DUMP__);

                        DINFO("scan %s @ db %u\n", table, i);
                }
        } else {
                args.func = func;
                args.arg = _arg;

                for (int i = 0; i < REDIS_GROUP; i++ ) {
                        args.db = i;
                        ret = redis_keys(__conn__[i], __disk_redis_iterator_table, &args);
                        if (unlikely(ret))
                                UNIMPLEMENTED(__DUMP__);
                }
        }
}

static void __disk_redis_iterator_decode(void *_key, void *_value, void *_arg)
{
        const char *key = _key;
        const char *value = _value;
        args_t *args = _arg;
        chkid_t chkid;
        char pool[MAX_NAME_LEN];
        
        disk_redis_key(key, &chkid);
        disk_redis_decode(value, pool, NULL, NULL, NULL);
        args->func(args->arg, &chkid, pool);
}

void disk_redis_iterator(const char *table, func2_t func, void *arg)
{
        args_t args;
        args.func = func;
        args.arg = arg;

        __disk_redis_iterator(table, __disk_redis_iterator_decode, &args);
}

void disk_redis_iterator_raw(const char *table, func2_t func, void *arg)
{
        __disk_redis_iterator(table, func, arg);
}

static void __disk_redis_iterator_bydisk(void *key, void *_arg)
{
        args_t *args = _arg;
        chkid_t chkid;
        int ret, buflen;
        char hash[MAX_BUF_LEN], value[MAX_BUF_LEN];
        chkid_t parent;
        char pool[MAX_NAME_LEN];
        diskloc_t loc;
        uint64_t meta_version;

        disk_redis_key(key, &chkid);

        if (((args->flag & DM_FLAG_MD) && (chkid.type != __RAW_CHUNK__))
            || ((args->flag & DM_FLAG_RAW) && (chkid.type == __RAW_CHUNK__))) {
                DINFO("scan %s %s\n", args->type, key);

                __disk_redis_key_hash(&chkid, hash);
                buflen = MAX_BUF_LEN;
                ret = redis_hget(db(chkid.id), hash, key, value, &buflen);
                if (unlikely(ret)) {
                        DWARN("%s @ %s not found\n", key, hash);
                        return;
                }

                disk_redis_decode(value, pool, &loc, &parent, &meta_version);
                args->func1(&chkid, pool, &loc, &parent, meta_version, args->arg);
        } else {
                DINFO("skip %s %s\n", args->type, key);
        }
}


void disk_redis_iterator_bydisk(int diskid, dm_itor_t func, void *arg, int flag)
{
        int ret;
        char disk[MAX_NAME_LEN];

        args_t args;
        args.func1 = func;
        args.arg = arg;
        args.flag = flag;
        __disk_redis_diskset(diskid, disk);
        ret = redis_siterator(db(diskid), disk, __disk_redis_iterator_bydisk, &args);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);
}

typedef struct {
        //int retval;
        int idx;
        void *arg;
        const char *table;
        const char *condition;
        func2_t func;
        sem_t sem;
} args1_t;

static void *__disk_redis_iterator_mt(void *_args)
{
        int ret;
        args1_t *args1 = _args;
        args_t args;

        if (strcmp(args1->table, __metadata__) == 0) {
                ret = redis_hiterator(__conn__[args1->idx], __metadata__, args1->func, args1->arg);
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);

                DINFO("scan %s @ db %u\n", args1->table, args1->idx);
        } else {
                args.func = args1->func;
                args.arg = args1->arg;

                ret = redis_keys(__conn__[args1->idx], __disk_redis_iterator_table, &args);
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);
        }
        
        sem_post(&args1->sem);
        
        return NULL;
}

void disk_redis_iterator_mt(const char *table, func2_t func, void *arg)
{
        int ret, i;
        args1_t array[REDIS_GROUP], *args;

        for (i = 0; i < REDIS_GROUP; i++) {
                args = &array[i];
                //args->retval = 0;
                args->idx = i;
                args->arg = arg;
                args->table = table;
                args->func = func;
                ret = sem_init(&args->sem, 0, 0);
                if (ret)
                        UNIMPLEMENTED(__DUMP__);

                ret = sy_thread_create2(__disk_redis_iterator_mt, args, "disk_redis_iterator");
                if (ret)
                        UNIMPLEMENTED(__DUMP__);
        }

        for (i = 0; i < REDIS_GROUP; i++) {
                args = &array[i];

                ret = sem_wait(&args->sem);
                if (ret)
                        UNIMPLEMENTED(__DUMP__);
        }
}

typedef struct {
        void *arg;
        dm_itor_t func;
} arg_new_t;

static void __disk_redis_iterator_new(void *key, void *value, void *_arg)
{
        arg_new_t *arg = _arg;
        chkid_t chkid, parent;
        char pool[MAX_NAME_LEN];
        diskloc_t loc;
        uint64_t meta_version;

        disk_redis_key(key, &chkid);
        disk_redis_decode(value, pool, &loc, &parent, &meta_version);

        DINFO("chunk "CHKID_FORMAT" pool %s disk(%u, %u) parent "CHKID_FORMAT" meta_version %u\n",
              CHKID_ARG(&chkid), pool, loc.diskid, loc.idx, CHKID_ARG(&parent), meta_version);
        arg->func(&chkid, pool, &loc, &parent, meta_version, arg->arg);
}

void disk_redis_iterator_new(dm_itor_t func, void *_arg, int flag)
{
        arg_new_t arg;

        arg.arg = _arg;
        arg.func = func;
        
        if (flag & DM_FLAG_MT) {
                if (flag & DM_FLAG_MD) {
                        disk_redis_iterator_mt("metadata", __disk_redis_iterator_new, &arg);
                }

                if (flag & DM_FLAG_RAW) {
                        disk_redis_iterator_mt("raw", __disk_redis_iterator_new, &arg);
                }
        } else {
                if (flag & DM_FLAG_MD) {
                        disk_redis_iterator_raw("metadata", __disk_redis_iterator_new, &arg);
                }

                if (flag & DM_FLAG_RAW) {
                        disk_redis_iterator_raw("raw", __disk_redis_iterator_new, &arg);
                }
        }
}

static void __disk_redis_iterator_byvol_new(const volid_t *volid, dm_itor_t func, void *_arg)
{
        arg_new_t arg;

        arg.arg = _arg;
        arg.func = func;
        
        disk_redis_iterator_byvol(volid, __disk_redis_iterator_new, &arg);
}

static int __disk_redis_cleanup__(const chkid_t *chkid, const char *pool, const diskloc_t *loc,
                                  const chkid_t *parent, const uint64_t meta, void *ctx)
{
        const char *_pool = ctx;

        if (strcmp(pool, _pool) == 0) {
                disk_redis_del(chkid, loc, 1);
        }

        return 0;
}

static int __disk_redis_cleanup(const char *pool)
{
        disk_redis_iterator_new(__disk_redis_cleanup__, (void *)pool, DM_FLAG_RAW | DM_FLAG_MD);
        
        return 0;
}


struct sche_thread_ops redis_ops = {
        .type           = SCHE_THREAD_REDIS,
        .begin_trans    = NULL,
        .commit_trans   = NULL,
};

int disk_redis_ops_register()
{
        return sche_thread_ops_register(&redis_ops, redis_ops.type, REDIS_GROUP * 2);
}

static disk_maping_t __disk_maping__ = {
        .init = disk_redis_init,
        .close = disk_redis_close,
        .create = disk_redis_create,
        .del = disk_redis_del,
        .setparent = disk_redis_setparent,
        .setloc = disk_redis_setloc,
        .getmetaversion = disk_redis_getmetaversion,
        .setmetaversion = disk_redis_setmetaversion,
        .load = disk_redis_load,
        .exist = disk_redis_exist,
        .cleanup = __disk_redis_cleanup,
        .count = disk_redis_count,
        .iterator_bydisk = disk_redis_iterator_bydisk,
        .iterator_byvol = __disk_redis_iterator_byvol_new,
        .iterator = disk_redis_iterator,
        .iterator_new = disk_redis_iterator_new,
};

disk_maping_t * disk_maping_redis()
{
     return  &__disk_maping__; 
}
